package Api.hdfstools;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;

/*
 * @author lzxyzq
 * HDFS一些常用命令，可参照Hadoop API FileSystem
 */
public class HDFSTools {
    public HDFSTools(){ }
    //判断某个文件是否存在hdfs文件系统中已经存在
    public static boolean exists(Configuration config,String path)throws IOException{
        //根据配置文件信息，形成文件系统对象
        FileSystem fs = FileSystem.get(config);
        return fs.exists(new Path(path)); //把文件路径转换成Path
        //判断是否已经存在文件
    }
    //给一个文件名和内容，我们在hdfs中创建文件
    public static void createFile(Configuration config,String path,byte[] buf)throws IOException{
        //config-->fs-->fos-->write-->close
        FileSystem fs = FileSystem.get(config);
        Path path2 = new Path(path);
        FSDataOutputStream fos = fs.create(path2);
        fos.write(buf);
        fos.close();
        fs.close();
    }
    public static void createFile(Configuration config,String path,String info)throws IOException{
        //和上面函数的区别是，上面函数传入的是buf数据，下面传入的是String
        createFile(config,path,info.getBytes());
    }
    //把hdfs上的一个文件拷贝到hdfs上另外一个位置
    public static void copyFromLocal(Configuration config,String path1,String path2)throws IOException{
        FileSystem fs = FileSystem.get(config);
        Path p1 = new Path(path1);
        Path p2 = new Path(path2);
        fs.copyFromLocalFile(true, true, p1, p2);  //delSrc, overwrite, srcs, dst
        fs.close();
    }
    //删除hdfs上的某一个文件
    public static boolean deleteFile(Configuration config,String path,boolean mark)throws IOException{
        //config-->fs-->delete-->close-->返回
        FileSystem fs = FileSystem.get(config);
        Path p1 = new Path(path);
        boolean flag = fs.delete(p1, mark);
        fs.close();
        return flag;
    }
    public static boolean deleteFile(Configuration config,String path)throws IOException{
        return deleteFile(config,path,true);
    }
    //修改HDFS上的文件名字String-->PATH-->fs.renameFile
    public static boolean renameFile(Configuration config,String path1,Strign path2){
        //config-->fs-->delete-->close-->返回
        FileSystem fs = FileSystem.get(config);
        Path oldName = new Path(path1);
        Path newName = new Path(path2);
        boolean flag = fs.rename(oldName, newName);
        fs.close();
        return flag;
    }
    //创建hdfs上的一个目录
    public static boolean makeDir(Configuration config,String path)throws IOException{
        FileSystem fs = FileSystem.get(config);
        Path name = new Path(path);
        boolean flag = fs.mkdirs(name);
        fs.close();
        return flag;
    }
    public static RemoteIterator<LocatedFileStatus> listFiles(Configuration config,String path,boolean mark)throws IOException{
        FileSystem fs = FileSystem.get(config);
        RemoteIterator<LocatedFileStatus> r1 = fs.listFiles(new Path(path), mark);
        return r1;
    }
    public static RemoteIterator<LocatedFileStatus> listFiles(Configuration config,String path)throws IOException{
        return listFiles(config,path,false);
    }

    public static FileStatus[] listStatus(Configuration config, String path)throws IOException{
        FileSystem fs = FileSystem.get(config);
        Path path1 = new Path(path);
        FileStatus status[] = fs.listStatus(path1);
        fs.close();
        return status;
    }
    //读取文件内容，返回字符串
    public static String readFile(Configuration config,String path)throws IOException{
        FileSystem fs = FileSystem.get(config);
        String temp;
        InputStream is = null;   //输入流
        ByteArrayOutputStream baos = null;  //输出流
        Path p = new Path(path);
        try{
            is =fs.open(p); //打开输入流
            //输入流输出流对接
            baos = new ByteArrayOutputStream(is.available());
            //把输入流的内容导入到输出流中
            IOUtils.copyBytes(is, baos, config);
            temp = baos.toString(); //把输出流的内容转成字符串
        }
        finally{
            //关闭输入输出流和文件系统
            IOUtils.closeStream(is);
            IOUtils.closeStream(baos);
            fs.close();
        }
        return temp;
    }

    public static boolean createDictionary(Configuration config,String path)throws IOException{
        FileSystem fs = FileSystem.get(config);
        Path p = new Path(path);
        boolean flag = fs.mkdirs(p);
        fs.close();
        return flag;
    }

    public static void main(String[] args) throws IOException{
        Configuration config = new Configuration();
        String dir = "/test";
        boolean flag = exists(config,dir);
        if(flag==true){
            System.out.println("Sorry it's exists");
        }
        else{
            flag = createDictionary(config,dir);
            if(flag==true){
                System.out.println("创建成功");
            }
            else{
                System.out.println("创建失败");
            }
        }
        String info = "北航！！！";
        String fileName = dir+"/buaa.txt";
        createFile(config,fileName,info);
        System.out.println("创建文件成功");
        String message = readFile(config,fileName);
        System.out.println(message);

        FileStatus all[] = listStatus(config,"/");
        for(int i=0;i<all.length;i++){
            System.out.println(all[i]);
        }

        FileSystem fs = FileSystem.get(config);
        RemoteIterator ri = listFiles(config,"/",true);

        while(ri.hasNext()){
            String str = ri.next().toString();
            System.out.println(str);
        }

        deleteFile(config,fileName);
        System.out.println(fileName+"is deleted");
    }
}